package org.iot.spark.processor

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext, Time}
import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.iot.spark.enity.POITrafficData
import org.iot.spark.tools.HbaseTools
import org.iot.spark.util.PropertFileReader
import org.iot.spark.vo.{IotData, POIData}


object IotDataProcessor {
  def main(args: Array[String]): Unit = {

    val prop:Properties = PropertFileReader.readPropertyFile()
    val conf = new SparkConf().setAppName(prop.getProperty("com.iot.spark.app.name")).setMaster(prop.getProperty("com.iot.spark.master"))
    conf.set("spark.streaming.kafka.maxRatePerPartition", "20")  //平均每秒从同一partition拉取20条数据
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  //kyro序列化性能更快
    val ssc = new StreamingContext(conf,Seconds(5))

    val brokers = prop.getProperty("kafka.broker")
    val groupId = prop.getProperty("kafka.consumer") //注意，这个也就是我们的消费者的名字
    val topicsSet = Array(prop.getProperty("com.iot.data.topics"))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "group.id" -> groupId,
      "fetch.message.max.bytes" -> "209715200",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> "org.iot.spark.util.IotDataDecode",  //使用自定义序列化器
      "enable.auto.commit" -> "false" //手动提交偏移量
    )

    ssc.checkpoint(prop.getProperty("com.iot.spark.checkpoint.dir"))//设置checkpoint目录

    //从hbase获取零偏值，匹配*data-event的topic
    val originalStream: InputDStream[ConsumerRecord[String, IotData]] = HbaseTools.getStreamingContextFromHBase(ssc,kafkaParams,topicsSet,groupId,"(.*)data-event")

    val nofilterStream: DStream[IotData] = originalStream.map(_.value())  //取出IotData原始数据

    val filterStream: DStream[(String, IotData)] = nofilterStream.map((iot: IotData) => new Tuple2[String, IotData](iot.getVehicleId, iot)).reduceByKey((a,b) => a) //过滤同一车牌多余数据，只留下一条

    val stateSpec = StateSpec.function((currentBatchTime: Time, key: String, iot: Option[IotData], currentState: State[Boolean]) => {
      var vehicle = new Tuple2[IotData, Boolean](iot.get, false)
      if (currentState.exists()){
        vehicle = new Tuple2[IotData, Boolean](iot.get, true)  /* 若之前有同key数据运行过则置为true，标为已处理过的车辆 */
      }else{
        currentState.update(true)   /* 更新此key（车牌）的状态 */
      }
      Some(vehicle)
    }).timeout(Seconds(3600))  //只维护1小时内的key
    val OneHourAddStream = filterStream.mapWithState(stateSpec)

    val OneHourAddFilterStream:DStream[(IotData,Boolean)] = OneHourAddStream.map(tuple => tuple).filter(_._2.equals(false)) //过滤出新增的车辆数据

    val OneHourIotStream:DStream[IotData] = OneHourAddFilterStream.map(_._1)

    OneHourIotStream.cache() //缓存DStream数据

    IotTrafficDataProcessor.processTotalTrafficData(OneHourIotStream)  //处理交通总量
    IotTrafficDataProcessor.processWindowTrafficData(OneHourIotStream) //处理窗口交通量

    val poiData = POIData(33.877495, -95.50238, 30)
    val broadcastPOIValues = ssc.sparkContext.broadcast((poiData, "Route-37", "Small Truck"))  //广播变量
    IotTrafficDataProcessor.processPOIData(OneHourIotStream,broadcastPOIValues) //统计37街道，30公里范围内小卡车的数据

    originalStream.foreachRDD( eachRdd =>{
      //更新offset
      val offsetRanges: Array[OffsetRange] = eachRdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //result.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)  //将offset提交到默认的kafka的topic里面去保存
      for(eachrange <-  offsetRanges){
        val startOffset: Long = eachrange.fromOffset  //起始offset
        val endOffset: Long = eachrange.untilOffset  //结束offset
        val topic: String = eachrange.topic
        val partition: Int = eachrange.partition
        HbaseTools.saveBatchOffset(groupId,topic,partition+"",endOffset)
      }
    })

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}